package org.meichao.demo.service.csvJob;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;

@Service
public abstract class CsvJobService {

    private static final Logger LOGGER = LoggerFactory.getLogger(CsvJobService.class);

    /**
     * deque的大小
     */
    public static final int DEQUE_SIZE = 10000;

    /**
     * 线程数量
     */
    public static final int THREAD_SIZE = 16;

    /**
     * 数据单次入库条数
     */
    public static final int SINGLE_SIZE = 1000;

    /**
     * 导入cvs文件
     * @param filePath 文件路径
     * @param charSet 文件编码
     */
    public void importCVSFile(String filePath,String charSet){
        //双向队列
        BlockingDeque deque = new LinkedBlockingDeque(DEQUE_SIZE);
        //开始时间
        long startTime = System.currentTimeMillis();

        //生产者 读取cvs文件，数据放入deque
        ExecutorService product = Executors.newSingleThreadExecutor();
        product.execute(() -> {
            try {
                BufferedReader reader = Files.newBufferedReader(Paths.get(filePath, charSet));
                for (int i = 0; ; i++) {
                    String line = reader.readLine();
                    if (line == null) {
                        LOGGER.info("------生产者完成：总数据：{} 条", i);
                        break;
                    }
                    deque.put(line);
                }
            } catch (Exception e) {
                LOGGER.info("------读取文件异常 e:{}", e);
            }
        });

        CountDownLatch countDownLatch = new CountDownLatch(THREAD_SIZE);
        //固定大小的子线程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_SIZE);
        //防止阻塞，多线程哪个执行完先返回哪个
        ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService(executorService);
        for (int i = 0; i < THREAD_SIZE; i++) {
            executorCompletionService.submit(new Customer(deque,countDownLatch));
        }

        try {
            //等待子线程执行完
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int jobSum = 0;
        for (int i = 0; i < THREAD_SIZE; i++) {
            try {
                //子线程结果
                Integer temp = executorCompletionService.take().get();
                jobSum += temp;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        product.shutdown();
        executorService.shutdown();
        long endTime = System.currentTimeMillis();
        LOGGER.info("导入任务完成,总数据：{}条,用时：{}秒", jobSum, (endTime - startTime) / 1000);
    }

    /**
     * 消费者
     */
    class Customer implements Callable{

        private BlockingDeque<String> deque;
        private CountDownLatch countDownLatch;

        public Customer(BlockingDeque deque, CountDownLatch countDownLatch) {
            this.deque = deque;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public Integer call() throws Exception {
            int sum = 0;
            List<String> list = new LinkedList();
            for (;;){
                String poll = deque.poll(1, TimeUnit.SECONDS);
                if(poll == null){
                    sum += list.size();
                    saveJob(list);
                    list.clear();
                    break;
                }
                list.add(poll);
                if (list.size() >= SINGLE_SIZE) {
                    sum += list.size();
                    saveJob(list);
                    list.clear();
                }
            }
            countDownLatch.countDown();
            return sum;
        }
    }

    /**
     * 数据处理方法
     * @param list
     */
    public abstract void saveJob(List<String> list);

}
